import asyncio
import json
import os
import ssl
import threading
from typing import TYPE_CHECKING, Optional, Dict, Sequence, Tuple, Iterable, List
from decimal import Decimal
import math
import time

import attr
import aiohttp

from electrum_ecc import ECPrivkey

import electrum_aionostr as aionostr
import electrum_aionostr.key
from electrum_aionostr.event import Event
from electrum_aionostr.util import to_nip19

from collections import defaultdict


from .i18n import _
from .logging import Logger
from .crypto import sha256, ripemd
from .bitcoin import (script_to_p2wsh, opcodes, dust_threshold, DummyAddress, construct_witness,
                      construct_script, address_to_script)
from . import bitcoin
from .transaction import (
    PartialTxInput, PartialTxOutput, PartialTransaction, Transaction, TxInput, TxOutpoint, script_GetOp,
    match_script_against_template, OPPushDataGeneric, OPPushDataPubkey, TxOutput,
)
from .util import (
    log_exceptions, ignore_exceptions, BelowDustLimit, OldTaskGroup, ca_path, gen_nostr_ann_pow,
    get_nostr_ann_pow_amount, make_aiohttp_proxy_connector, get_running_loop, get_asyncio_loop, wait_for2,
    run_sync_function_on_asyncio_thread, trigger_callback, NoDynamicFeeEstimates, UserFacingException,
)
from . import lnutil
from .lnutil import hex_to_bytes, REDEEM_AFTER_DOUBLE_SPENT_DELAY, Keypair
from .lnaddr import lndecode
from .json_db import StoredObject, stored_in
from . import constants
from .address_synchronizer import (TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE, TX_HEIGHT_UNCONFIRMED,
                                   TX_HEIGHT_UNCONF_PARENT)
from .fee_policy import FeePolicy
from .invoices import Invoice, PR_PAID
from .lnonion import OnionRoutingFailure, OnionFailureCode
from .lnsweep import SweepInfo


if TYPE_CHECKING:
    from .network import Network
    from .wallet import Abstract_Wallet
    from .lnwatcher import LNWatcher
    from .lnworker import LNWallet
    from .lnchannel import Channel
    from .simple_config import SimpleConfig
    from aiohttp_socks import ProxyConnector


SWAP_TX_SIZE = 150  # default tx size, used for mining fee estimation

MIN_SWAP_AMOUNT_SAT = 20_000
MIN_LOCKTIME_DELTA = 60
LOCKTIME_DELTA_REFUND = 70
MAX_LOCKTIME_DELTA = 100
MIN_FINAL_CLTV_DELTA_FOR_CLIENT = 3 * 144  # note: put in invoice, but is not enforced by receiver in lnpeer.py
assert MIN_LOCKTIME_DELTA <= LOCKTIME_DELTA_REFUND <= MAX_LOCKTIME_DELTA
assert MAX_LOCKTIME_DELTA < lnutil.MIN_FINAL_CLTV_DELTA_ACCEPTED
assert MAX_LOCKTIME_DELTA < MIN_FINAL_CLTV_DELTA_FOR_CLIENT


# The script of the reverse swaps has one extra check in it to verify
# that the length of the preimage is 32. This is required because in
# the reverse swaps the preimage is generated by the user and to
# settle the hold invoice, you need a preimage with 32 bytes . If that
# check wasn't there the user could generate a preimage with a
# different length which would still allow for claiming the onchain
# coins but the invoice couldn't be settled

# Unified witness-script for all swaps.  Historically with Boltz-backend, this was the reverse-swap script.
WITNESS_TEMPLATE_SWAP = [
    opcodes.OP_SIZE,
    OPPushDataGeneric(None),               # idx 1. length of preimage
    opcodes.OP_EQUAL,
    opcodes.OP_IF,
    opcodes.OP_HASH160,
    OPPushDataGeneric(lambda x: x == 20),  # idx 5. payment_hash
    opcodes.OP_EQUALVERIFY,
    OPPushDataPubkey,                      # idx 7. claim_pubkey
    opcodes.OP_ELSE,
    opcodes.OP_DROP,
    OPPushDataGeneric(None),               # idx 10. locktime
    opcodes.OP_CHECKLOCKTIMEVERIFY,
    opcodes.OP_DROP,
    OPPushDataPubkey,                      # idx 13. refund_pubkey
    opcodes.OP_ENDIF,
    opcodes.OP_CHECKSIG
]


def _check_swap_scriptcode(
    *,
    redeem_script: bytes,
    lockup_address: str,
    payment_hash: bytes,
    locktime: int,
    refund_pubkey: Optional[bytes],   # note: We don't need to check the counterparty's key.
    claim_pubkey: Optional[bytes],    #       Can use None in that case.
) -> None:
    assert (refund_pubkey is not None) or (claim_pubkey is not None), "at least one pubkey must be set"
    parsed_script = [x for x in script_GetOp(redeem_script)]
    if not match_script_against_template(redeem_script, WITNESS_TEMPLATE_SWAP):
        raise Exception("rswap check failed: scriptcode does not match template")
    if script_to_p2wsh(redeem_script) != lockup_address:
        raise Exception("rswap check failed: inconsistent scriptcode and address")
    if ripemd(payment_hash) != parsed_script[5][1]:
        raise Exception("rswap check failed: our preimage not in script")
    claim_pubkey = claim_pubkey or parsed_script[7][1]
    if claim_pubkey != parsed_script[7][1]:
        raise Exception("rswap check failed: our pubkey not in script")
    refund_pubkey = refund_pubkey or parsed_script[13][1]
    if refund_pubkey != parsed_script[13][1]:
        raise Exception("rswap check failed: our pubkey not in script")
    if locktime != int.from_bytes(parsed_script[10][1], byteorder='little'):
        raise Exception("rswap check failed: inconsistent locktime and script")
    # let's just rebuild the full script from scratch...
    if redeem_script != _construct_swap_scriptcode(
        payment_hash=payment_hash,
        locktime=locktime,
        refund_pubkey=refund_pubkey,
        claim_pubkey=claim_pubkey,
    ):
        raise Exception("failed to rebuild swap script from scratch")


def _construct_swap_scriptcode(
    payment_hash: bytes,
    locktime: int,
    refund_pubkey: bytes,
    claim_pubkey: bytes,
) -> bytes:
    assert isinstance(payment_hash, bytes) and len(payment_hash) == 32
    assert isinstance(locktime, int) and (0 <= locktime <= bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX)
    assert isinstance(refund_pubkey, bytes) and len(refund_pubkey) == 33
    assert isinstance(claim_pubkey, bytes) and len(claim_pubkey) == 33
    return construct_script(
        WITNESS_TEMPLATE_SWAP,
        values={1: 32, 5: ripemd(payment_hash), 7: claim_pubkey, 10: locktime, 13: refund_pubkey}
    )


class SwapServerError(Exception):
    def __init__(self, message=None):
        self.message = message
        super().__init__(message)

    def __str__(self):
        if self.message:
            return self.message
        return _("The swap server errored or is unreachable.")


def now():
    return int(time.time())


@attr.s(frozen=True)
class SwapFees:
    percentage = attr.ib(type=int)
    mining_fee = attr.ib(type=int)
    min_amount = attr.ib(type=int)
    max_forward = attr.ib(type=int)
    max_reverse = attr.ib(type=int)


@attr.frozen
class SwapOffer:
    pairs = attr.ib(type=SwapFees)
    relays = attr.ib(type=list[str])
    pow_bits = attr.ib(type=int)
    server_pubkey = attr.ib(type=str)
    timestamp = attr.ib(type=int)

    @property
    def server_npub(self):
        return to_nip19('npub', self.server_pubkey)


@stored_in('submarine_swaps')
@attr.s
class SwapData(StoredObject):
    is_reverse = attr.ib(type=bool)  # for whoever is running code (PoV of client or server)
    locktime = attr.ib(type=int)  # onchain, abs
    onchain_amount = attr.ib(type=int)  # in sats
    lightning_amount = attr.ib(type=int)  # in sats
    redeem_script = attr.ib(type=bytes, converter=hex_to_bytes)
    preimage = attr.ib(type=Optional[bytes], converter=hex_to_bytes)
    prepay_hash = attr.ib(type=Optional[bytes], converter=hex_to_bytes)
    privkey = attr.ib(type=bytes, converter=hex_to_bytes)
    lockup_address = attr.ib(type=str)
    claim_to_output = attr.ib(type=Optional[Tuple[str, int]])  # address, amount to claim the funding utxo to
    funding_txid = attr.ib(type=Optional[str])
    spending_txid = attr.ib(type=Optional[str])
    is_redeemed = attr.ib(type=bool)

    _funding_prevout = None  # type: Optional[TxOutpoint]  # for RBF
    _payment_hash = None
    _payment_pending = False # for forward swaps

    @property
    def payment_hash(self) -> bytes:
        return self._payment_hash

    def is_funded(self) -> bool:
        return self._payment_pending or bool(self.funding_txid)


def pubkey_to_rgb_color(swapserver_pubkey: str) -> Tuple[int, int, int]:
    assert isinstance(swapserver_pubkey, str), type(swapserver_pubkey)
    assert len(swapserver_pubkey) == 64, len(swapserver_pubkey)
    input_hash = int.from_bytes(sha256(swapserver_pubkey), byteorder="big")
    r = (input_hash & 0xFF0000) >> 16
    g = (input_hash & 0x00FF00) >> 8
    b = input_hash & 0x0000FF
    return r, g, b


class SwapManager(Logger):

    network: Optional['Network'] = None
    lnwatcher: Optional['LNWatcher'] = None

    def __init__(self, *, wallet: 'Abstract_Wallet', lnworker: 'LNWallet'):
        Logger.__init__(self)
        self.mining_fee = None
        self.percentage = None
        self._min_amount = None
        self._max_forward = None
        self._max_reverse = None

        self.wallet = wallet
        self.config = wallet.config
        self.lnworker = lnworker
        self.lnwatcher = self.lnworker.lnwatcher
        self.config = wallet.config
        self.taskgroup = OldTaskGroup()
        self.dummy_address = DummyAddress.SWAP

        # note: accessing swaps dicts (besides simple lookup) needs swaps_lock
        self.swaps_lock = threading.Lock()
        self._swaps = self.wallet.db.get_dict('submarine_swaps')  # type: Dict[str, SwapData]
        self._swaps_by_funding_outpoint = {}  # type: Dict[TxOutpoint, SwapData]
        self._swaps_by_lockup_address = {}  # type: Dict[str, SwapData]
        for payment_hash_hex, swap in self._swaps.items():
            payment_hash = bytes.fromhex(payment_hash_hex)
            swap._payment_hash = payment_hash
            self._add_or_reindex_swap(swap, is_new=False)
            if not swap.is_reverse and not swap.is_redeemed and not self.lnworker.get_preimage(swap.payment_hash):
                self.lnworker.register_hold_invoice(payment_hash, self.hold_invoice_callback)

        self._prepayments = {}  # type: Dict[bytes, bytes] # fee_rhash -> rhash
        for k, swap in self._swaps.items():
            if swap.prepay_hash is not None:
                self._prepayments[swap.prepay_hash] = bytes.fromhex(k)
        self.is_server = False # overridden by swapserver plugin if enabled
        self.is_initialized = asyncio.Event()
        self.pairs_updated = asyncio.Event()

    def start_network(self, network: 'Network'):
        assert network
        if self.network is not None:
            self.logger.info('start_network: already started')
            return
        self.logger.info('start_network: starting main loop')
        self.network = network
        with self.swaps_lock:
            swaps_items = list(self._swaps.items())
        for k, swap in swaps_items:
            if swap.is_redeemed:
                continue
            self.add_lnwatcher_callback(swap)
        asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)

    @log_exceptions
    async def run_nostr_server(self):
        await self.set_nostr_proof_of_work()

        while self.wallet.has_password() and self.wallet.get_unlocked_password() is None:
            self.logger.info("This wallet is password-protected. Please unlock it to start the swapserver plugin")
            await asyncio.sleep(10)

        with NostrTransport(self.config, self, self.lnworker.nostr_keypair) as transport:
            await transport.is_connected.wait()
            self.logger.info(f'nostr is connected')
            # will publish a new announcement if liquidity changed or every OFFER_UPDATE_INTERVAL_SEC
            last_update = time.time()
            while True:
                await asyncio.sleep(transport.LIQUIDITY_UPDATE_INTERVAL_SEC)

                previous_max_forward = self._max_forward
                previous_max_reverse = self._max_reverse
                previous_mining_fee = self.mining_fee
                try:
                    self.server_update_pairs()
                except Exception:
                    self.logger.exception("server_update_pairs failed")
                    continue

                liquidity_changed = self._max_forward != previous_max_forward \
                                        or self._max_reverse != previous_max_reverse
                mining_fees_changed = self.mining_fee != previous_mining_fee
                if liquidity_changed or mining_fees_changed:
                    self.logger.debug(f"updating announcement: {liquidity_changed=}, {mining_fees_changed=}")
                elif time.time() - last_update < transport.OFFER_UPDATE_INTERVAL_SEC:
                    continue

                await transport.publish_offer(self)
                last_update = time.time()

    @log_exceptions
    async def main_loop(self):
        tasks = [self.pay_pending_invoices()]
        if self.is_server:
            # nostr and http are not mutually exclusive
            if self.config.SWAPSERVER_PORT:
                tasks.append(self.http_server.run())
            if self.config.NOSTR_RELAYS:
                tasks.append(self.run_nostr_server())

        async with self.taskgroup as group:
            for task in tasks:
                await group.spawn(task)

    async def stop(self):
        await self.taskgroup.cancel_remaining()

    def create_transport(self) -> 'SwapServerTransport':
        from .lnutil import generate_random_keypair
        if self.config.SWAPSERVER_URL:
            return HttpTransport(self.config, self)
        else:
            keypair = self.lnworker.nostr_keypair if self.is_server else generate_random_keypair()
            return NostrTransport(self.config, self, keypair)

    async def set_nostr_proof_of_work(self) -> None:
        current_pow = get_nostr_ann_pow_amount(
            self.lnworker.nostr_keypair.pubkey[1:],
            self.config.SWAPSERVER_ANN_POW_NONCE
        )
        if current_pow >= self.config.SWAPSERVER_POW_TARGET:
            self.logger.debug(f"Reusing existing PoW nonce for nostr announcement.")
            return

        self.logger.info(f"Generating PoW for nostr announcement. Target: {self.config.SWAPSERVER_POW_TARGET}")
        nonce, pow_amount = await gen_nostr_ann_pow(
            self.lnworker.nostr_keypair.pubkey[1:],  # pubkey without prefix
            self.config.SWAPSERVER_POW_TARGET,
        )
        self.logger.debug(f"Found {pow_amount} bits of work for Nostr announcement.")
        self.config.SWAPSERVER_ANN_POW_NONCE = nonce

    async def pay_invoice(self, key):
        self.logger.info(f'trying to pay invoice {key}')
        self.invoices_to_pay[key] = 1000000000000 # lock
        try:
            invoice = self.wallet.get_invoice(key)
            success, log = await self.lnworker.pay_invoice(invoice)
        except Exception as e:
            self.logger.info(f'exception paying {key}, will not retry')
            self.invoices_to_pay.pop(key, None)
            return
        if not success:
            self.logger.info(f'failed to pay {key}, will retry in 10 minutes')
            self.invoices_to_pay[key] = now() + 600
        else:
            self.logger.info(f'paid invoice {key}')
            self.invoices_to_pay.pop(key, None)

    async def pay_pending_invoices(self):
        self.invoices_to_pay = {}
        while True:
            await asyncio.sleep(5)
            for key, not_before in list(self.invoices_to_pay.items()):
                if now() < not_before:
                    continue
                await self.taskgroup.spawn(self.pay_invoice(key))

    def cancel_normal_swap(self, swap: SwapData):
        """ we must not have broadcast the funding tx """
        if swap is None:
            return
        if swap.is_funded():
            self.logger.info(f'cannot cancel swap {swap.payment_hash.hex()}: already funded')
            return
        self._fail_swap(swap, 'user cancelled')

    def _fail_swap(self, swap: SwapData, reason: str):
        self.logger.info(f'failing swap {swap.payment_hash.hex()}: {reason}')
        if not swap.is_reverse and swap.payment_hash in self.lnworker.hold_invoice_callbacks:
            # unregister_hold_invoice will fail pending htlcs if there is no preimage available
            self.lnworker.unregister_hold_invoice(swap.payment_hash)
            self.lnworker.delete_payment_info(swap.payment_hash.hex(), direction=lnutil.RECEIVED)
            self.lnworker.clear_invoices_cache()
        self.lnwatcher.remove_callback(swap.lockup_address)
        if not swap.is_funded():
            with self.swaps_lock:
                if self._swaps.pop(swap.payment_hash.hex(), None) is None:
                    self.logger.debug(f"swap {swap.payment_hash.hex()} has already been deleted.")
                if swap._funding_prevout is not None:
                    self._swaps_by_funding_outpoint.pop(swap._funding_prevout, None)
                self._swaps_by_lockup_address.pop(swap.lockup_address, None)
                if swap.prepay_hash is not None:
                    self._prepayments.pop(swap.prepay_hash, None)
                    if self.lnworker.get_payment_status(swap.prepay_hash, direction=lnutil.RECEIVED) != PR_PAID:
                        self.lnworker.delete_payment_info(swap.prepay_hash.hex(), direction=lnutil.RECEIVED)
                        self.lnworker.delete_payment_bundle(payment_hash=swap.payment_hash)
                    if self.lnworker.get_payment_status(swap.prepay_hash, direction=lnutil.SENT) != PR_PAID:
                        self.lnworker.delete_payment_info(swap.prepay_hash.hex(), direction=lnutil.SENT)
                if self.lnworker.get_payment_status(swap.payment_hash, direction=lnutil.SENT) != PR_PAID:
                    self.lnworker.delete_payment_info(swap.payment_hash.hex(), direction=lnutil.SENT)

    @classmethod
    def extract_preimage(cls, swap: SwapData, claim_tx: Transaction) -> Optional[bytes]:
        for txin in claim_tx.inputs():
            witness = txin.witness_elements()
            if not witness or len(witness) < 2:
                # tx may be unsigned
                continue
            preimage = witness[1]
            if sha256(preimage) == swap.payment_hash:
                return preimage
        return None

    @log_exceptions
    async def _claim_swap(self, swap: SwapData) -> None:
        assert self.network
        assert self.lnwatcher
        if not self.lnwatcher.adb.is_up_to_date():
            return
        current_height = self.network.get_local_height()
        remaining_time = swap.locktime - current_height
        txos = self.lnwatcher.adb.get_addr_outputs(swap.lockup_address)

        for txin in txos.values():
            if swap.is_reverse and txin.value_sats() < swap.onchain_amount:
                # amount too low, we must not reveal the preimage
                continue
            break
        else:
            # swap not funded.
            txin = None
            # if it is a normal swap, we might have double spent the funding tx
            # in that case we need to fail the HTLCs
            if remaining_time <= 0:
                self._fail_swap(swap, 'expired')

        if txin:
            # the swap is funded
            # note: swap.funding_txid can change due to RBF, it will get updated here:
            swap.funding_txid = txin.prevout.txid.hex()
            swap._funding_prevout = txin.prevout
            self._add_or_reindex_swap(swap, is_new=False)  # to update _swaps_by_funding_outpoint
            funding_height = self.lnwatcher.adb.get_tx_height(txin.prevout.txid.hex())
            spent_height = txin.spent_height
            # set spending_txid (even if tx is local), for GUI grouping
            swap.spending_txid = txin.spent_txid
            # discard local spenders
            if spent_height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]:
                spent_height = None
            if spent_height is not None:
                if spent_height > 0 and swap.preimage:
                    if current_height - spent_height > REDEEM_AFTER_DOUBLE_SPENT_DELAY:
                        self.logger.info(f'stop watching swap {swap.lockup_address}')
                        swap.is_redeemed = True
                        # cleanup
                        self.lnwatcher.remove_callback(swap.lockup_address)
                        if not swap.is_reverse:
                            self.lnworker.delete_payment_bundle(payment_hash=swap.payment_hash)
                            self.lnworker.unregister_hold_invoice(swap.payment_hash)

            if not swap.is_reverse:
                if swap.preimage is None and spent_height is not None:
                    # extract the preimage, add it to lnwatcher
                    claim_tx = self.lnwatcher.adb.get_transaction(txin.spent_txid)
                    preimage = self.extract_preimage(swap, claim_tx)
                    if preimage:
                        swap.preimage = preimage
                        self.logger.info(f'found preimage: {preimage.hex()}')
                        self.lnworker.save_preimage(swap.payment_hash, preimage)
                    else:
                        # this is our refund tx
                        if spent_height > 0:
                            self.logger.info(f'refund tx confirmed: {txin.spent_txid} {spent_height}')
                            self._fail_swap(swap, 'refund tx confirmed')
                            return
                if remaining_time > 0:
                    # too early for refund
                    return
                if swap.preimage:
                    # we have been paid. do not try to get refund.
                    return
            else:
                if swap.preimage is None:
                    swap.preimage = self.lnworker.get_preimage(swap.payment_hash)
                if swap.preimage is None:
                    if funding_height.conf <= 0:
                        return
                    key = swap.payment_hash.hex()
                    if remaining_time <= MIN_LOCKTIME_DELTA:
                        if key in self.invoices_to_pay:
                            # fixme: should consider cltv of ln payment
                            self.logger.info(f'locktime too close {key} {remaining_time}')
                            self.invoices_to_pay.pop(key, None)
                        return
                    if key not in self.invoices_to_pay:
                        self.invoices_to_pay[key] = 0
                    return

                if self.network.config.TEST_SWAPSERVER_REFUND:
                    # for testing: do not create claim tx
                    return

            if spent_height is not None and spent_height > 0:
                return
            txin, locktime = self.create_claim_txin(txin=txin, swap=swap)
            if swap.is_reverse and swap.claim_to_output:
                asyncio.create_task(self._claim_to_output(swap, txin))
                return
            # note: there is no csv in the script, we just set this so that txbatcher waits for one confirmation
            name = 'swap claim' if swap.is_reverse else 'swap refund'
            can_be_batched = True
            sweep_info = SweepInfo(
                txin=txin,
                cltv_abs=locktime,
                txout=None,
                name=name,
                can_be_batched=can_be_batched,
                dust_override=False,
            )
            try:
                self.wallet.txbatcher.add_sweep_input('swaps', sweep_info)
            except BelowDustLimit:
                self.logger.info('utxo value below dust threshold')
                return
            except NoDynamicFeeEstimates:
                self.logger.info('got NoDynamicFeeEstimates')
                return

    async def _claim_to_output(self, swap: SwapData, claim_txin: PartialTxInput):
        """
        Construct claim tx that spends exactly the funding utxo to the swap output, independent of the
        current fee environment to guarantee the correct amount is being sent to the claim output which
        might be an external address.
        """
        assert swap.claim_to_output, swap
        txout = PartialTxOutput.from_address_and_value(swap.claim_to_output[0], swap.claim_to_output[1])
        tx = PartialTransaction.from_io([claim_txin], [txout])
        funding_tx_confirmed = self.wallet.adb.get_tx_height(swap.funding_txid).height() > TX_HEIGHT_UNCONFIRMED
        already_broadcast = self.wallet.adb.get_tx_height(tx.txid()).height() >= TX_HEIGHT_UNCONF_PARENT
        self.logger.debug(f"_claim_to_output: {funding_tx_confirmed=} {already_broadcast=}")

        # add tx to db so it can be shown as future tx
        if not self.wallet.adb.get_transaction(tx.txid()):
            try:
                self.wallet.adb.add_transaction(tx)
            except Exception:
                self.logger.exception("")
                return
            trigger_callback('wallet_updated', self)

        # set or update future tx wanted height if it has not been broadcast yet
        local_height = self.network.get_local_height()
        wanted_height = local_height + claim_txin.get_block_based_relative_locktime()
        if not already_broadcast and self.wallet.adb.future_tx.get(tx.txid(), 0) < wanted_height:
            self.wallet.adb.set_future_tx(tx.txid(), wanted_height=wanted_height)

        if funding_tx_confirmed and not already_broadcast:
            tx = self.wallet.sign_transaction(tx, password=None, ignore_warnings=True)
            assert tx and tx.is_complete(), tx
            try:
                await self.wallet.network.broadcast_transaction(tx)
            except Exception:
                self.logger.exception(f"cannot broadcast swap to output claim tx")

    def get_fee_for_txbatcher(self):
        return self._get_tx_fee(self.config.FEE_POLICY_SWAPS)

    def _get_tx_fee(self, policy_descriptor: str):
        fee_policy = FeePolicy(policy_descriptor)
        return fee_policy.estimate_fee(SWAP_TX_SIZE, network=self.network, allow_fallback_to_static_rates=True)

    def _sanity_check_swap_costs(
        self,
        *,
        incoming_sat: int,
        outgoing_sat: int,
    ) -> None:
        """The user should have already seen the swap amounts, and hence the cost.
        These are just some last-minute sanity checks that the cost of the swap is not insane.
        """
        costs_abs = outgoing_sat - incoming_sat
        costs_ratio = 1 - incoming_sat / outgoing_sat
        if costs_abs < 10_000:  # "small" amounts are exempt from checks
            return
        exc = UserFacingException(_("Total swap costs are insane.") + f"\n({costs_ratio=}, {costs_abs=} sat)")
        if costs_ratio > 0.25:
            raise exc
        if costs_abs > 1_000_000:
            if costs_ratio > 0.15:
                raise exc

    def get_swap(self, payment_hash: bytes) -> Optional[SwapData]:
        # for history
        swap = self._swaps.get(payment_hash.hex())
        if swap:
            return swap
        payment_hash = self._prepayments.get(payment_hash)
        if payment_hash:
            return self._swaps.get(payment_hash.hex())
        return None

    def add_lnwatcher_callback(self, swap: SwapData) -> None:
        callback = lambda: self._claim_swap(swap)
        self.lnwatcher.add_callback(swap.lockup_address, callback)

    async def hold_invoice_callback(self, payment_hash: bytes) -> None:
        # note: this assumes the wallet has been unlocked
        key = payment_hash.hex()
        if swap := self._swaps.get(key):
            if not swap.is_funded():
                output = self.create_funding_output(swap)
                self.wallet.txbatcher.add_payment_output('swaps', output)
                swap._payment_pending = True
        else:
            self.logger.info(f'key not in swaps {key}')

    def create_normal_swap(self, *, lightning_amount_sat: int, payment_hash: bytes, their_pubkey: bytes = None):
        """ server method """
        assert lightning_amount_sat
        if payment_hash.hex() in self._swaps:
            raise Exception("payment_hash already in use")
        locktime = self.network.get_local_height() + LOCKTIME_DELTA_REFUND
        if self.network.blockchain().is_tip_stale():
            raise Exception("our blockchain tip is stale")
        our_privkey = os.urandom(32)
        our_pubkey = ECPrivkey(our_privkey).get_public_key_bytes(compressed=True)
        onchain_amount_sat = self._get_recv_amount(lightning_amount_sat, is_reverse=True) # what the client is going to receive
        if not onchain_amount_sat:
            raise Exception("no onchain amount")
        redeem_script = _construct_swap_scriptcode(
            payment_hash=payment_hash,
            locktime=locktime,
            refund_pubkey=our_pubkey,
            claim_pubkey=their_pubkey,
        )
        swap, invoice, prepay_invoice = self.add_normal_swap(
            redeem_script=redeem_script,
            locktime=locktime,
            onchain_amount_sat=onchain_amount_sat,
            lightning_amount_sat=lightning_amount_sat,
            payment_hash=payment_hash,
            our_privkey=our_privkey,
            prepay=True,
        )
        self.lnworker.register_hold_invoice(payment_hash, self.hold_invoice_callback)
        return swap, invoice, prepay_invoice

    def add_normal_swap(
            self, *,
            redeem_script: bytes,
            locktime: int,  # onchain, abs
            onchain_amount_sat: int,
            lightning_amount_sat: int,
            payment_hash: bytes,
            our_privkey: bytes,
            prepay: bool,
            channels: Optional[Sequence['Channel']] = None,
            min_final_cltv_expiry_delta: Optional[int] = None,
    ) -> Tuple[SwapData, str, Optional[str]]:
        """creates a hold invoice"""
        if payment_hash.hex() in self._swaps:
            raise Exception("payment_hash already in use")
        if prepay:
            # server requests 2 * the mining fee as instantly settled prepayment so that the mining
            # fees of the funding tx and potential timeout refund tx are always covered
            prepay_amount_sat = self.mining_fee * 2
            invoice_amount_sat = lightning_amount_sat - prepay_amount_sat
        else:
            invoice_amount_sat = lightning_amount_sat

        # add payment info to lnworker
        self.lnworker.add_payment_info_for_hold_invoice(
            payment_hash,
            lightning_amount_sat=invoice_amount_sat,
            min_final_cltv_delta=min_final_cltv_expiry_delta or lnutil.MIN_FINAL_CLTV_DELTA_ACCEPTED,
            exp_delay=300,
        )
        info = self.lnworker.get_payment_info(payment_hash, direction=lnutil.RECEIVED)
        lnaddr1, invoice = self.lnworker.get_bolt11_invoice(
            payment_info=info,
            message='Submarine swap',
            fallback_address=None,
            channels=channels,
        )
        margin_to_get_refund_tx_mined = MIN_LOCKTIME_DELTA
        if not (locktime + margin_to_get_refund_tx_mined < self.network.get_local_height() + lnaddr1.get_min_final_cltv_delta()):
            raise Exception(
                f"onchain locktime ({locktime}+{margin_to_get_refund_tx_mined}) "
                f"too close to LN-htlc-expiry ({self.network.get_local_height()+lnaddr1.get_min_final_cltv_delta()})")

        if prepay:
            prepay_hash = self.lnworker.create_payment_info(
                amount_msat=prepay_amount_sat*1000,
                min_final_cltv_delta=min_final_cltv_expiry_delta or lnutil.MIN_FINAL_CLTV_DELTA_ACCEPTED,
                exp_delay=300,
            )
            info = self.lnworker.get_payment_info(prepay_hash, direction=lnutil.RECEIVED)
            lnaddr2, prepay_invoice = self.lnworker.get_bolt11_invoice(
                payment_info=info,
                message='Submarine swap prepayment',
                fallback_address=None,
                channels=channels,
            )
            self.lnworker.bundle_payments([payment_hash, prepay_hash])
            self._prepayments[prepay_hash] = payment_hash
            assert lnaddr1.get_min_final_cltv_delta() == lnaddr2.get_min_final_cltv_delta()
        else:
            prepay_invoice = None
            prepay_hash = None

        lockup_address = script_to_p2wsh(redeem_script)
        swap = SwapData(
            redeem_script=redeem_script,
            locktime=locktime,
            privkey=our_privkey,
            preimage=None,
            prepay_hash=prepay_hash,
            lockup_address=lockup_address,
            onchain_amount=onchain_amount_sat,
            claim_to_output=None,
            lightning_amount=lightning_amount_sat,
            is_reverse=False,
            is_redeemed=False,
            funding_txid=None,
            spending_txid=None,
        )
        swap._payment_hash = payment_hash
        self._add_or_reindex_swap(swap, is_new=True)
        self.add_lnwatcher_callback(swap)
        return swap, invoice, prepay_invoice

    def create_reverse_swap(self, *, lightning_amount_sat: int, their_pubkey: bytes) -> SwapData:
        """ server method. """
        assert lightning_amount_sat is not None
        locktime = self.network.get_local_height() + LOCKTIME_DELTA_REFUND
        if self.network.blockchain().is_tip_stale():
            raise Exception("our blockchain tip is stale")
        privkey = os.urandom(32)
        our_pubkey = ECPrivkey(privkey).get_public_key_bytes(compressed=True)
        onchain_amount_sat = self._get_send_amount(lightning_amount_sat, is_reverse=False)
        if not onchain_amount_sat:
            raise Exception("no onchain amount")
        preimage = os.urandom(32)
        payment_hash = sha256(preimage)
        redeem_script = _construct_swap_scriptcode(
            payment_hash=payment_hash,
            locktime=locktime,
            refund_pubkey=their_pubkey,
            claim_pubkey=our_pubkey,
        )
        swap = self.add_reverse_swap(
            redeem_script=redeem_script,
            locktime=locktime,
            privkey=privkey,
            preimage=preimage,
            payment_hash=payment_hash,
            prepay_hash=None,
            onchain_amount_sat=onchain_amount_sat,
            lightning_amount_sat=lightning_amount_sat)
        return swap

    def add_reverse_swap(
        self,
        *,
        redeem_script: bytes,
        locktime: int,  # onchain
        privkey: bytes,
        lightning_amount_sat: int,
        onchain_amount_sat: int,
        preimage: bytes,
        payment_hash: bytes,
        prepay_hash: Optional[bytes] = None,
        claim_to_output: Optional[TxOutput] = None,
    ) -> SwapData:
        if payment_hash.hex() in self._swaps:
            raise Exception("payment_hash already in use")
        assert sha256(preimage) == payment_hash
        lockup_address = script_to_p2wsh(redeem_script)
        if claim_to_output is not None:
            # the claim_to_output value needs to be lower than the funding utxo value, otherwise
            # there are no funds left for the fee of the claim tx
            assert claim_to_output.value < onchain_amount_sat, f"{claim_to_output=} >= {onchain_amount_sat=}"
            claim_to_output = (claim_to_output.address, claim_to_output.value)
        swap = SwapData(
            redeem_script=redeem_script,
            locktime=locktime,
            privkey=privkey,
            preimage=preimage,
            prepay_hash=prepay_hash,
            lockup_address=lockup_address,
            onchain_amount=onchain_amount_sat,
            claim_to_output=claim_to_output,
            lightning_amount=lightning_amount_sat,
            is_reverse=True,
            is_redeemed=False,
            funding_txid=None,
            spending_txid=None,
        )
        if prepay_hash:
            if prepay_hash in self._prepayments:
                raise Exception("prepay_hash already in use")
            self._prepayments[prepay_hash] = payment_hash
        swap._payment_hash = payment_hash
        self._add_or_reindex_swap(swap, is_new=True)
        self.add_lnwatcher_callback(swap)
        return swap

    def server_add_swap_invoice(self, request: dict) -> dict:
        """ server method.
        (client-forward-swap phase2)
        """
        invoice = request['invoice']
        invoice = Invoice.from_bech32(invoice)
        key = invoice.rhash
        payment_hash = bytes.fromhex(key)
        their_pubkey = bytes.fromhex(request['refundPublicKey'])
        with self.swaps_lock:
            assert key in self._swaps
            swap = self._swaps[key]
            assert swap.lightning_amount == int(invoice.get_amount_sat())
            assert swap.is_reverse is True
            # check that we have the preimage
            assert sha256(swap.preimage) == payment_hash
            assert swap.spending_txid is None
            # check their_pubkey by recalculating redeem_script
            our_pubkey = ECPrivkey(swap.privkey).get_public_key_bytes(compressed=True)
            redeem_script = _construct_swap_scriptcode(
                payment_hash=payment_hash, locktime=swap.locktime, refund_pubkey=their_pubkey, claim_pubkey=our_pubkey,
            )
            assert swap.redeem_script == redeem_script
            assert key not in self.invoices_to_pay
            self.invoices_to_pay[key] = 0
            assert self.wallet.get_invoice(invoice.get_id()) is None
            self.wallet.save_invoice(invoice)
        return {}

    async def normal_swap(
            self,
            *,
            transport: 'SwapServerTransport',
            lightning_amount_sat: int,
            expected_onchain_amount_sat: int,
            password,
            tx: PartialTransaction = None,
            channels = None,
    ) -> Optional[str]:
        """send on-chain BTC, receive on Lightning

        Old (removed) flow:
        - User generates an LN invoice with RHASH, and knows preimage.
        - User creates on-chain output locked to RHASH.
        - Server pays LN invoice. User reveals preimage.
        - Server spends the on-chain output using preimage.
        cltv safety requirement: (onchain_locktime > LN_locktime),   otherwise server is vulnerable

        New flow:
         - User requests swap  (RPC 'createnormalswap')
         - Server creates preimage, sends RHASH to user
         - User creates hold invoice, sends it to server  (RPC 'addswapinvoice')
         - Server sends HTLC, user holds it
         - User creates on-chain output locked to RHASH
         - Server spends the on-chain output using preimage (revealing the preimage)
         - User fulfills HTLC using preimage
        cltv safety requirement: (onchain_locktime < LN_locktime),   otherwise client is vulnerable
        """
        assert self.network
        assert self.lnwatcher
        swap, invoice = await self.request_normal_swap(
            transport=transport,
            lightning_amount_sat=lightning_amount_sat,
            expected_onchain_amount_sat=expected_onchain_amount_sat,
            channels=channels,
        )
        tx = self.create_funding_tx(swap, tx, password=password)
        return await self.wait_for_htlcs_and_broadcast(transport=transport, swap=swap, invoice=invoice, tx=tx)

    async def request_normal_swap(
            self,
            *,
            transport: 'SwapServerTransport',
            lightning_amount_sat: int,
            expected_onchain_amount_sat: int,
            channels: Optional[Sequence['Channel']] = None,
    ) -> Tuple[SwapData, str]:
        self._sanity_check_swap_costs(
            incoming_sat=lightning_amount_sat, outgoing_sat=expected_onchain_amount_sat)
        await self.is_initialized.wait() # add timeout
        refund_privkey = os.urandom(32)
        refund_pubkey = ECPrivkey(refund_privkey).get_public_key_bytes(compressed=True)
        self.logger.info('requesting preimage hash for swap')
        request_data = {
            "invoiceAmount": lightning_amount_sat,
            "refundPublicKey": refund_pubkey.hex()
        }
        data = await transport.send_request_to_server('createnormalswap', request_data)
        try:
            payment_hash = bytes.fromhex(data["preimageHash"])
            assert len(payment_hash) == 32, len(payment_hash)
            onchain_amount = data["expectedAmount"]
            assert isinstance(onchain_amount, int), type(onchain_amount)
            locktime = data["timeoutBlockHeight"]
            assert isinstance(locktime, int), type(locktime)
            lockup_address = data["address"]
            assert isinstance(lockup_address, str), type(lockup_address)
            assert bitcoin.is_address(lockup_address), lockup_address
            redeem_script = bytes.fromhex(data["redeemScript"])
        except Exception as e:
            self.logger.error(f"failed to parse response from swapserver for createnormalswap: {e!r}")
            raise SwapServerError("failed to parse response from swapserver for createnormalswap") from e
        del data   # parsing done
        # verify redeem_script is built with our pubkey and preimage
        _check_swap_scriptcode(
            redeem_script=redeem_script,
            lockup_address=lockup_address,
            payment_hash=payment_hash,
            locktime=locktime,
            refund_pubkey=refund_pubkey,
            claim_pubkey=None,
        )

        # check that onchain_amount is not more than what we estimated
        if onchain_amount > expected_onchain_amount_sat:
            raise Exception(f"fswap check failed: onchain_amount is more than what we estimated: "
                            f"{onchain_amount} > {expected_onchain_amount_sat}")
        # verify that they are not locking up funds for too long
        if locktime - self.network.get_local_height() > MAX_LOCKTIME_DELTA:
            raise Exception("fswap check failed: locktime too far in future")
        if self.network.blockchain().is_tip_stale():
            raise Exception("our blockchain tip is stale")

        swap, invoice, _ = self.add_normal_swap(
            redeem_script=redeem_script,
            locktime=locktime,
            lightning_amount_sat=lightning_amount_sat,
            onchain_amount_sat=onchain_amount,
            payment_hash=payment_hash,
            our_privkey=refund_privkey,
            prepay=False,
            channels=channels,
            # When the client is doing a normal swap, we create a ln-invoice with larger than usual final_cltv_delta.
            # If the user goes offline after broadcasting the funding tx (but before it is mined and
            # the server claims it), they need to come back online before the held ln-htlc expires (see #8940).
            # If the held ln-htlc expires, and the funding tx got confirmed, the server will have claimed the onchain
            # funds, and the ln-htlc will be timed out onchain (and channel force-closed). i.e. the user loses the swap
            # amount. Increasing the final_cltv_delta the user puts in the invoice extends this critical window.
            min_final_cltv_expiry_delta=MIN_FINAL_CLTV_DELTA_FOR_CLIENT,
        )
        return swap, invoice

    async def wait_for_htlcs_and_broadcast(
            self,
            *,
            transport: 'SwapServerTransport',
            swap: SwapData,
            invoice: str,
            tx: Transaction,
    ) -> Optional[str]:
        await transport.is_connected.wait()
        payment_hash = swap.payment_hash
        refund_pubkey = ECPrivkey(swap.privkey).get_public_key_bytes(compressed=True)
        async def callback(payment_hash):
            # FIXME what if this raises, e.g. TxBroadcastError?
            #       We will never retry the hold-invoice-callback.
            await self.broadcast_funding_tx(swap, tx)

        self.lnworker.register_hold_invoice(payment_hash, callback)

        # send invoice to server and wait for htlcs
        # note: server will link this RPC to our previous 'createnormalswap' RPC
        #       - using the RHASH from invoice, and using refundPublicKey
        #       - FIXME it would be safer to use a proper session-secret?!
        request_data = {
            "invoice": invoice,
            "refundPublicKey": refund_pubkey.hex(),
        }
        await transport.send_request_to_server('addswapinvoice', request_data)
        # wait for funding tx
        lnaddr = lndecode(invoice)
        while swap.funding_txid is None and not lnaddr.is_expired():
            await asyncio.sleep(0.1)
        return swap.funding_txid

    def create_funding_output(self, swap: SwapData) -> PartialTxOutput:
        return PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount)

    def create_funding_tx(
            self,
            swap: SwapData,
            tx: Optional[PartialTransaction],
            *,
            password,
    ) -> PartialTransaction:
        # create funding tx
        # use fee policy set by user (not using txbatcher)
        fee_policy = FeePolicy(self.config.FEE_POLICY)
        # note: rbf must not decrease payment
        # this is taken care of in wallet._is_rbf_allowed_to_touch_tx_output
        if tx is None:
            funding_output = self.create_funding_output(swap)
            tx = self.wallet.make_unsigned_transaction(
                outputs=[funding_output],
                rbf=True,
                fee_policy=fee_policy,
            )
        else:
            tx.replace_output_address(DummyAddress.SWAP, swap.lockup_address)
            tx.set_rbf(True)
        self.wallet.sign_transaction(tx, password)
        return tx

    @log_exceptions
    async def request_swap_for_amount(
        self,
        *,
        transport: 'SwapServerTransport',
        onchain_amount: int,
    ) -> Optional[Tuple[SwapData, str]]:
        await self.is_initialized.wait()
        lightning_amount_sat = self.get_recv_amount(onchain_amount, is_reverse=False)
        if lightning_amount_sat is None:
            raise SwapServerError(_("Swap amount outside of providers limits") + ":\n"
                                  + _("min") + f": {self.get_min_amount()}\n"
                                  + _("max") + f": {self.get_provider_max_reverse_amount()}")
        swap, invoice = await self.request_normal_swap(
            transport=transport,
            lightning_amount_sat=lightning_amount_sat,
            expected_onchain_amount_sat=onchain_amount)
        return swap, invoice

    @log_exceptions
    async def broadcast_funding_tx(self, swap: SwapData, tx: Transaction) -> None:
        swap.funding_txid = tx.txid()
        await self.network.broadcast_transaction(tx)

    async def reverse_swap(
            self,
            *,
            transport: 'SwapServerTransport',
            lightning_amount_sat: int,
            expected_onchain_amount_sat: int,
            prepayment_sat: int,
            channels: Optional[Sequence['Channel']] = None,
            claim_to_output: Optional[TxOutput] = None,
    ) -> Optional[str]:
        """send on Lightning, receive on-chain

        - User generates preimage, RHASH. Sends RHASH to server.  (RPC 'createswap')
        - Server creates an LN invoice for RHASH.
        - User pays LN invoice - except server needs to hold the HTLC as preimage is unknown.
            - if the server requested a fee prepayment (using 'minerFeeInvoice'),
              the server will have the preimage for that. The user will send HTLCs for both the main RHASH,
              and for the fee prepayment. Once both MPP sets arrive at the server, the server will fulfill
              the HTLCs for the fee prepayment (before creating the on-chain output).
        - Server creates on-chain output locked to RHASH.
        - User spends on-chain output, revealing preimage.
        - Server fulfills HTLC using preimage.

        cltv safety requirement: (onchain_locktime < LN_locktime),   otherwise server is vulnerable

        Note: expected_onchain_amount_sat is BEFORE deducting the on-chain claim tx fee.
        Note: prepayment_sat is passed as argument instead of accessing self.mining_fee to ensure
        the mining fees the user sees in the GUI are also the values used for the checks performed here.
        We commit to prepayment_sat as it limits the max fee pre-payment amt, which the server is trusted with.
        """
        assert self.network
        assert self.lnwatcher
        self._sanity_check_swap_costs(
            incoming_sat=expected_onchain_amount_sat, outgoing_sat=lightning_amount_sat)
        privkey = os.urandom(32)
        our_pubkey = ECPrivkey(privkey).get_public_key_bytes(compressed=True)
        preimage = os.urandom(32)
        payment_hash = sha256(preimage)
        request_data = {
            "type": "reversesubmarine",
            "pairId": "BTC/BTC",
            "invoiceAmount": lightning_amount_sat,
            "preimageHash": payment_hash.hex(),
            "claimPublicKey": our_pubkey.hex(),
        }
        self.logger.debug(f'rswap: sending request for {lightning_amount_sat}')
        data = await transport.send_request_to_server('createswap', request_data)
        try:
            invoice = data['invoice']
            assert isinstance(invoice, str), type(invoice)
            fee_invoice = data.get('minerFeeInvoice')
            assert fee_invoice is None or isinstance(fee_invoice, str), type(fee_invoice)
            lockup_address = data['lockupAddress']
            assert isinstance(lockup_address, str), type(lockup_address)
            assert bitcoin.is_address(lockup_address), lockup_address
            redeem_script = bytes.fromhex(data['redeemScript'])
            locktime = data['timeoutBlockHeight']
            assert isinstance(locktime, int), type(locktime)
            onchain_amount = data["onchainAmount"]
            assert isinstance(onchain_amount, int), type(onchain_amount)
            response_id = data['id']
        except Exception as e:
            self.logger.error(f"failed to parse response from swapserver for createswap: {e!r}")
            raise SwapServerError("failed to parse response from swapserver for createswap") from e
        del data  # parsing done
        self.logger.debug(f'rswap: {response_id=}')
        # verify redeem_script is built with our pubkey and preimage
        _check_swap_scriptcode(
            redeem_script=redeem_script,
            lockup_address=lockup_address,
            payment_hash=payment_hash,
            locktime=locktime,
            refund_pubkey=None,
            claim_pubkey=our_pubkey,
        )
        # check that the onchain amount is what we expected
        if onchain_amount < expected_onchain_amount_sat:
            raise Exception(f"rswap check failed: onchain_amount is less than what we expected: "
                            f"{onchain_amount} < {expected_onchain_amount_sat}")
        # verify that we will have enough time to get our tx confirmed
        if locktime - self.network.get_local_height() <= MIN_LOCKTIME_DELTA:
            raise Exception("rswap check failed: locktime too close")
        if self.network.blockchain().is_tip_stale():
            raise Exception("our blockchain tip is stale")
        # verify invoice payment_hash
        lnaddr = self.lnworker._check_bolt11_invoice(invoice)
        invoice_amount = int(lnaddr.get_amount_sat())
        if lnaddr.paymenthash != payment_hash:
            raise Exception("rswap check failed: inconsistent RHASH and invoice")
        if fee_invoice:
            fee_lnaddr = self.lnworker._check_bolt11_invoice(fee_invoice)
            if fee_lnaddr.get_amount_sat() > prepayment_sat:
                raise SwapServerError(_("Mining fee requested by swap-server larger "
                                        "than what was announced in their offer."))
            invoice_amount += fee_lnaddr.get_amount_sat()
            prepay_hash = fee_lnaddr.paymenthash
        else:
            prepay_hash = None
        # check that the lightning amount is what we requested
        if int(invoice_amount) != lightning_amount_sat:
            raise Exception(f"rswap check failed: invoice_amount ({invoice_amount}) "
                            f"not what we requested ({lightning_amount_sat})")
        # save swap data to wallet file
        swap = self.add_reverse_swap(
            redeem_script=redeem_script,
            locktime=locktime,
            privkey=privkey,
            preimage=preimage,
            payment_hash=payment_hash,
            prepay_hash=prepay_hash,
            onchain_amount_sat=onchain_amount,
            lightning_amount_sat=lightning_amount_sat,
            claim_to_output=claim_to_output,
        )
        # initiate fee payment.
        if fee_invoice:
            fee_invoice_obj = Invoice.from_bech32(fee_invoice)
            asyncio.ensure_future(self.lnworker.pay_invoice(fee_invoice_obj))
        # we return if we detect funding
        async def wait_for_funding(swap):
            while swap.funding_txid is None:
                await asyncio.sleep(0.1)
        # initiate main payment
        invoice_obj = Invoice.from_bech32(invoice)
        tasks = [asyncio.create_task(self.lnworker.pay_invoice(invoice_obj, channels=channels)), asyncio.create_task(wait_for_funding(swap))]
        await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        return swap.funding_txid

    def _add_or_reindex_swap(self, swap: SwapData, *, is_new: bool) -> None:
        with self.swaps_lock:
            assert is_new == (swap.payment_hash.hex() not in self._swaps), is_new
            if swap.payment_hash.hex() not in self._swaps:
                self._swaps[swap.payment_hash.hex()] = swap
            if swap._funding_prevout:
                self._swaps_by_funding_outpoint[swap._funding_prevout] = swap
            self._swaps_by_lockup_address[swap.lockup_address] = swap

    def server_update_pairs(self) -> None:
        """ for server """
        self.percentage = float(self.config.SWAPSERVER_FEE_MILLIONTHS) / 10000  # type: ignore
        self._min_amount = MIN_SWAP_AMOUNT_SAT
        oc_balance_sat: int = self.wallet.get_spendable_balance_sat()
        max_forward: int = min(int(self.lnworker.num_sats_can_receive()), oc_balance_sat, 10000000)
        max_reverse: int = min(int(self.lnworker.num_sats_can_send()), 10000000)
        self._max_forward: int = self._keep_leading_digits(max_forward, 2)
        self._max_reverse: int = self._keep_leading_digits(max_reverse, 2)
        new_mining_fee = self.get_fee_for_txbatcher()
        if self.mining_fee is None \
                or abs(self.mining_fee - new_mining_fee) / self.mining_fee > 0.1:
            self.mining_fee = new_mining_fee

    @staticmethod
    def _keep_leading_digits(num: int, digits: int) -> int:
        """Reduces precision of num to `digits` leading digits."""
        if num <= 0:
            return 0
        num_str = str(num)
        zeroed_num_str = f"{num_str[:digits]}{(len(num_str[digits:])) * '0'}"
        return int(zeroed_num_str)

    def update_pairs(self, pairs: SwapFees):
        self.logger.info(f'updating fees {pairs}')
        self.mining_fee = pairs.mining_fee
        self.percentage = pairs.percentage
        self._min_amount = pairs.min_amount
        self._max_forward = pairs.max_forward
        self._max_reverse = pairs.max_reverse
        self.trigger_pairs_updated_threadsafe()

    def trigger_pairs_updated_threadsafe(self):
        def trigger():
            self.is_initialized.set()
            self.pairs_updated.set()
            self.pairs_updated.clear()

        run_sync_function_on_asyncio_thread(trigger, block=True)

    def get_provider_max_forward_amount(self) -> int:
        """in sat"""
        return self._max_forward

    def get_provider_max_reverse_amount(self) -> int:
        """in sat"""
        return self._max_reverse

    def get_min_amount(self) -> int:
        """in satoshis"""
        return self._min_amount

    def check_invoice_amount(self, x, is_reverse: bool) -> bool:
        if is_reverse:
            max_amount = self.get_provider_max_forward_amount()
        else:
            max_amount = self.get_provider_max_reverse_amount()
        return self.get_min_amount() <= x <= max_amount

    def _get_recv_amount(self, send_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
        """For a given swap direction and amount we send, returns how much we will receive.

        Note: in the reverse direction, the mining fee for the on-chain claim tx is NOT accounted for.
        In the reverse direction, the result matches what the swap server returns as response["onchainAmount"].
        """
        if send_amount is None:
            return None
        x = Decimal(send_amount)
        percentage = Decimal(self.percentage)
        if is_reverse:
            if not self.check_invoice_amount(x, is_reverse):
                return None
            # see/ref:
            # https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L948
            percentage_fee = math.ceil(percentage * x / 100)
            base_fee = self.mining_fee
            x -= percentage_fee + base_fee
            x = math.floor(x)
            if x < dust_threshold():
                return None
        else:
            x -= self.mining_fee
            percentage_fee = math.ceil(x * percentage / (100 + percentage))
            x -= percentage_fee
            if not self.check_invoice_amount(x, is_reverse):
                return None
        x = int(x)
        return x

    def _get_send_amount(self, recv_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
        """For a given swap direction and amount we want to receive, returns how much we will need to send.

        Note: in the reverse direction, the mining fee for the on-chain claim tx is NOT accounted for.
        In the forward direction, the result matches what the swap server returns as response["expectedAmount"].
        """
        if not recv_amount:
            return None
        x = Decimal(recv_amount)
        percentage = Decimal(self.percentage)
        if is_reverse:
            # see/ref:
            # https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L928
            # https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L958
            base_fee = self.mining_fee
            x += base_fee
            x = math.ceil(x / ((100 - percentage) / 100))
            if not self.check_invoice_amount(x, is_reverse):
                return None
        else:
            if not self.check_invoice_amount(x, is_reverse):
                return None
            # see/ref:
            # https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L708
            # https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/rates/FeeProvider.ts#L90
            percentage_fee = math.ceil(percentage * x / 100)
            x += percentage_fee + self.mining_fee
        x = int(x)
        return x

    def get_recv_amount(self, send_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
        # first, add percentage fee
        recv_amount = self._get_recv_amount(send_amount, is_reverse=is_reverse)
        # sanity check calculation can be inverted
        if recv_amount is not None:
            inverted_send_amount = self._get_send_amount(recv_amount, is_reverse=is_reverse)
            # accept off-by ones as amt_rcv = recv_amt(send_amt(amt_rcv)) only up to +-1
            if abs(send_amount - inverted_send_amount) > 1:
                raise Exception(f"calc-invert-sanity-check failed. is_reverse={is_reverse}. "
                                f"send_amount={send_amount} -> recv_amount={recv_amount} -> inverted_send_amount={inverted_send_amount}")
        # second, add on-chain claim tx fee
        if is_reverse and recv_amount is not None:
            recv_amount -= self.get_fee_for_txbatcher()
        return recv_amount

    def get_send_amount(self, recv_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
        # first, add on-chain claim tx fee
        if is_reverse and recv_amount is not None:
            recv_amount += self.get_fee_for_txbatcher()
        # second, add percentage fee
        send_amount = self._get_send_amount(recv_amount, is_reverse=is_reverse)
        # sanity check calculation can be inverted
        if send_amount is not None:
            inverted_recv_amount = self._get_recv_amount(send_amount, is_reverse=is_reverse)
            if recv_amount != inverted_recv_amount:
                raise Exception(f"calc-invert-sanity-check failed. is_reverse={is_reverse}. "
                                f"recv_amount={recv_amount} -> send_amount={send_amount} -> inverted_recv_amount={inverted_recv_amount}")
        return send_amount

    def get_swaps_by_funding_tx(self, tx: Transaction) -> Iterable[SwapData]:
        swaps = []
        for txout_idx, _txo in enumerate(tx.outputs()):
            prevout = TxOutpoint(txid=bytes.fromhex(tx.txid()), out_idx=txout_idx)
            if swap := self._swaps_by_funding_outpoint.get(prevout):
                swaps.append(swap)
        return swaps

    def get_swaps_by_claim_tx(self, tx: Transaction) -> Iterable[Tuple[int, SwapData]]:
        swaps = []
        for i, txin in enumerate(tx.inputs()):
            if swap := self.get_swap_by_claim_txin(txin):
                swaps.append((i, swap))
        return swaps

    def get_swap_by_claim_txin(self, txin: TxInput) -> Optional[SwapData]:
        return self._swaps_by_funding_outpoint.get(txin.prevout)

    def is_lockup_address_for_a_swap(self, addr: str) -> bool:
        return bool(self._swaps_by_lockup_address.get(addr))

    @classmethod
    def add_txin_info(cls, swap, txin: PartialTxInput) -> None:
        """Add some info to a claim txin.
        note: even without signing, this is useful for tx size estimation.
        """
        preimage = swap.preimage if swap.is_reverse else 0
        witness_script = swap.redeem_script
        txin.script_sig = b''
        txin.witness_script = witness_script
        sig_dummy = b'\x00' * 71  # DER-encoded ECDSA sig, with low S and low R
        witness = [sig_dummy, preimage, witness_script]
        txin.witness_sizehint = len(construct_witness(witness))
        txin.nsequence = 1 if swap.is_reverse else 0xffffffff - 2

    @classmethod
    def create_claim_txin(
            cls,
            *,
            txin: PartialTxInput,
            swap: SwapData,
    ) -> Tuple[PartialTxInput, Optional[int]]:
        if swap.is_reverse:  # successful reverse swap
            locktime = None
            # preimage will be set in sign_tx
        else:  # timing out forward swap
            locktime = swap.locktime
        cls.add_txin_info(swap, txin)
        txin.privkey = swap.privkey
        def make_witness(sig):
            # preimae not known yet
            preimage = swap.preimage if swap.is_reverse else 0
            witness_script = swap.redeem_script
            return construct_witness([sig, preimage, witness_script])
        txin.make_witness = make_witness
        return txin, locktime

    def client_max_amount_forward_swap(self) -> Optional[int]:
        """ returns None if we cannot swap """
        max_swap_amt_ln = self.get_provider_max_reverse_amount()
        if max_swap_amt_ln is None:
            return None
        max_recv_amt_ln = int(self.lnworker.num_sats_can_receive())
        max_amt_ln = int(min(max_swap_amt_ln, max_recv_amt_ln))
        max_amt_oc = self.get_send_amount(max_amt_ln, is_reverse=False) or 0
        min_amt_oc = self.get_send_amount(self.get_min_amount(), is_reverse=False) or 0
        return max_amt_oc if max_amt_oc >= min_amt_oc else None

    def client_max_amount_reverse_swap(self) -> Optional[int]:
        """Returns None if swap is not possible"""
        provider_max = self.get_provider_max_forward_amount()
        max_ln_send = int(self.lnworker.num_sats_can_send())
        max_swap_size = min(max_ln_send, provider_max)
        if max_swap_size < self.get_min_amount():
            return None
        return max_swap_size

    def server_create_normal_swap(self, request):
        # normal for client, reverse for server
        #request = await r.json()
        lightning_amount_sat = request['invoiceAmount']
        their_pubkey = bytes.fromhex(request['refundPublicKey'])
        assert len(their_pubkey) == 33
        swap = self.create_reverse_swap(
            lightning_amount_sat=lightning_amount_sat,
            their_pubkey=their_pubkey,
        )
        response = {
            "id": swap.payment_hash.hex(),
            'preimageHash': swap.payment_hash.hex(),
            "acceptZeroConf": False,
            "expectedAmount": swap.onchain_amount,
            "timeoutBlockHeight": swap.locktime,
            "address": swap.lockup_address,
            "redeemScript": swap.redeem_script.hex(),
        }
        return response

    def server_create_swap(self, request):
        # reverse for client, forward for server
        # requesting a normal swap (old protocol) will raise an exception
        #request = await r.json()
        req_type = request['type']
        assert request['pairId'] == 'BTC/BTC'
        if req_type == 'reversesubmarine':
            lightning_amount_sat=request['invoiceAmount']
            payment_hash=bytes.fromhex(request['preimageHash'])
            their_pubkey=bytes.fromhex(request['claimPublicKey'])
            assert len(payment_hash) == 32
            assert len(their_pubkey) == 33
            swap, invoice, prepay_invoice = self.create_normal_swap(
                lightning_amount_sat=lightning_amount_sat,
                payment_hash=payment_hash,
                their_pubkey=their_pubkey
            )
            response = {
                'id': payment_hash.hex(),
                'invoice': invoice,
                'minerFeeInvoice': prepay_invoice,
                'lockupAddress': swap.lockup_address,
                'redeemScript': swap.redeem_script.hex(),
                'timeoutBlockHeight': swap.locktime,
                "onchainAmount": swap.onchain_amount,
            }
        elif req_type == 'submarine':
            raise Exception('Deprecated API. Please upgrade your version of Electrum')
        else:
            raise Exception('unsupported request type:' + req_type)
        return response

    def get_groups_for_onchain_history(self):
        current_height = self.wallet.adb.get_local_height()
        d = {}
        with self.swaps_lock:
            swaps_items = list(self._swaps.items())
        for payment_hash_hex, swap in swaps_items:
            txid = swap.spending_txid if swap.is_reverse else swap.funding_txid
            if txid is None:
                continue

            if swap.is_reverse and swap.claim_to_output:
                group_label = 'Submarine Payment' + ' ' + self.config.format_amount_and_units(swap.claim_to_output[1])
            elif swap.is_reverse:
                group_label = 'Reverse swap' + ' ' + self.config.format_amount_and_units(swap.lightning_amount)
            else:
                group_label = 'Forward swap' + ' ' + self.config.format_amount_and_units(swap.onchain_amount)

            label = _('Claim transaction') if swap.is_reverse else _('Funding transaction')
            delta = current_height - swap.locktime
            if self.wallet.adb.is_mine(swap.lockup_address):
                tx_height = self.wallet.adb.get_tx_height(swap.funding_txid)
                if swap.is_reverse and tx_height.height() <= 0:
                    label += ' (%s)' % _('waiting for funding tx confirmation')
                if not swap.is_reverse and not swap.is_redeemed and swap.spending_txid is None and delta < 0:
                    label += f' (refundable in {-delta} blocks)' # fixme: only if unspent
            d[txid] = {
                'group_id': txid,
                'label': label,
                'group_label': group_label,
            }
            if not swap.is_reverse:
                claim_tx = self.lnwatcher.adb.get_transaction(swap.spending_txid)
                if claim_tx and not self.extract_preimage(swap, claim_tx):
                    # if the spending_tx is in the wallet, this will add it
                    # to the group (see wallet.get_full_history)
                    d[swap.spending_txid] = {
                        'group_id': txid,
                        'group_label': group_label,
                        'label': _('Refund transaction'),
                    }
                    self.wallet._accounting_addresses.add(swap.lockup_address)
            elif swap.is_reverse and swap.claim_to_output:  # submarine payment
                claim_tx = self.lnwatcher.adb.get_transaction(swap.spending_txid)
                payee_spk = address_to_script(swap.claim_to_output[0])
                if claim_tx and payee_spk not in (o.scriptpubkey for o in claim_tx.outputs()):
                    # the swapserver must have refunded itself as the claim_tx did not spend
                    # to the address we intended it to spend to, remove the funding
                    # address again from accounting addresses so the refund tx is not incorrectly
                    # shown in the wallet history as tx spending from this wallet
                    self.wallet._accounting_addresses.discard(swap.lockup_address)
                # add the funding tx to the group as the total amount of the group would
                # otherwise be ~2x the actual payment as the claim tx gets counted as negative
                # value (as it sends from the wallet/accounting address balance)
                d[swap.funding_txid] = {
                     'group_id': txid,
                     'label': _('Funding transaction'),
                     'group_label': group_label,
                }
                # add the lockup_address as the claim tx would otherwise not touch the wallet and
                # wouldn't be shown in the history.
                self.wallet._accounting_addresses.add(swap.lockup_address)

        return d

    def get_group_id_for_payment_hash(self, payment_hash: bytes) -> Optional[str]:
        # add group_id to swap transactions
        swap = self.get_swap(payment_hash)
        if swap:
            return swap.spending_txid if swap.is_reverse else swap.funding_txid
        return None

    def get_pending_swaps(self) -> List[SwapData]:
        """Returns a list of swaps with unconfirmed funding tx (which require us to stay online)."""
        pending_swaps: List[SwapData] = []
        with self.swaps_lock:
            swaps = list(self._swaps.values())
        for swap in swaps:
            if swap.is_redeemed:
                # adb data might have been removed after is_redeemed was set.
                # in that case lnwatcher will no longer fetch the spending tx
                # and adb will return TX_HEIGHT_LOCAL
                continue
            # note: adb.get_tx_height returns TX_HEIGHT_LOCAL if the txid is unknown
            funding_height = self.lnworker.wallet.adb.get_tx_height(swap.funding_txid).height()
            spending_height = self.lnworker.wallet.adb.get_tx_height(swap.spending_txid).height()
            if funding_height > TX_HEIGHT_LOCAL and spending_height <= TX_HEIGHT_LOCAL:
                pending_swaps.append(swap)
        return pending_swaps


class SwapServerTransport(Logger):

    def __init__(self, *, config: 'SimpleConfig', sm: 'SwapManager'):
        Logger.__init__(self)
        self.sm = sm
        self.network = sm.network
        self.config = config
        self.is_connected = asyncio.Event()
        self.connect_timeout = 10 if self.uses_proxy else 5

    def __enter__(self):
        pass

    def __exit__(self, ex_type, ex, tb):
        pass

    async def __aenter__(self):
        pass

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass

    async def send_request_to_server(self, method: str, request_data: Optional[dict]) -> dict:
        """Might raise SwapServerError."""
        pass

    @property
    def uses_proxy(self):
        return self.network.proxy and self.network.proxy.enabled


class HttpTransport(SwapServerTransport):

    def __init__(self, config, sm):
        SwapServerTransport.__init__(self, config=config, sm=sm)
        self.api_url = config.SWAPSERVER_URL
        self.is_connected.set()

    def __enter__(self):
        asyncio.run_coroutine_threadsafe(self.get_pairs_just_once(), self.network.asyncio_loop)
        return self

    def __exit__(self, ex_type, ex, tb):
        pass

    async def __aenter__(self):
        asyncio.create_task(self.get_pairs_just_once())
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass

    async def send_request_to_server(self, method, request_data):
        try:
            response = await self.network.async_send_http_on_proxy(
                'post' if request_data else 'get',
                self.api_url + '/' + method,
                json=request_data,
                timeout=30)
        except aiohttp.ClientError as e:
            self.logger.info(f"Swap server errored: {e!r}")
            raise SwapServerError() from e
        try:
            parsed_json = json.loads(response)
            if not isinstance(parsed_json, dict):
                raise Exception("malformed response, not dict")
        except Exception as e:
            self.logger.error(f"failed to parse response from swapserver for {method=}: {e!r}")
            raise SwapServerError(f"failed to parse response from swapserver for {method=}") from e
        return parsed_json

    async def get_pairs_just_once(self) -> None:
        """Might raise SwapServerError."""
        response = await self.send_request_to_server('getpairs', None)
        try:
            assert response.get('htlcFirst') is True
            fees = response['pairs']['BTC/BTC']['fees']
            limits = response['pairs']['BTC/BTC']['limits']
            pairs = SwapFees(
                percentage=fees['percentage'],
                mining_fee=fees['minerFees']['baseAsset']['mining_fee'],
                min_amount=limits['minimal'],
                max_forward=limits['max_forward_amount'],
                max_reverse=limits['max_reverse_amount'],
            )
        except Exception as e:
            self.logger.error(f"failed to parse response from swapserver for getpairs: {e!r}")
            raise SwapServerError("failed to parse response from swapserver for getpairs") from e
        self.sm.update_pairs(pairs)


class NostrTransport(SwapServerTransport):
    # uses nostr:
    #  - to advertise servers
    #  - for client-server RPCs (using DMs)
    #     (todo: we should use onion messages for that)

    EPHEMERAL_REQUEST = 25582
    USER_STATUS_NIP38 = 30315
    NOSTR_EVENT_VERSION = 5
    OFFER_UPDATE_INTERVAL_SEC = 60 * 10
    LIQUIDITY_UPDATE_INTERVAL_SEC = 30

    def __init__(self, config, sm, keypair: Keypair):
        SwapServerTransport.__init__(self, config=config, sm=sm)
        self._offers = {}  # type: Dict[str, SwapOffer]
        self.private_key = keypair.privkey
        self.nostr_private_key = to_nip19('nsec', keypair.privkey.hex())
        self.nostr_pubkey = keypair.pubkey.hex()[2:]
        self.dm_replies = {}  # type: Dict[tuple[str, str], asyncio.Future[dict]]
        self.ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
        self.relay_manager = None  # type: Optional[aionostr.Manager]
        self.taskgroup = OldTaskGroup()
        self._last_swapserver_relays = self._load_last_swapserver_relays()  # type: Optional[Sequence[str]]
        self._swap_server_requests = asyncio.Queue(maxsize=5)  # type: asyncio.Queue[dict]

    def __enter__(self):
        asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
        return self

    def __exit__(self, ex_type, ex, tb):
        fut = asyncio.run_coroutine_threadsafe(self.stop(), self.network.asyncio_loop)
        fut.result(timeout=5)

    async def __aenter__(self):
        asyncio.create_task(self.main_loop())
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await wait_for2(self.stop(), timeout=5)

    @log_exceptions
    async def main_loop(self):
        self.logger.info(f'starting nostr transport with pubkey: {self.nostr_pubkey}')
        self.logger.info(f'nostr relays: {self.relays}')
        self.relay_manager = self.get_relay_manager()
        await self.relay_manager.connect()
        connected_relays = self.relay_manager.relays
        self.logger.info(f'connected relays: {[relay.url for relay in connected_relays]}')
        if connected_relays:
            self.is_connected.set()
        if self.sm.is_server:
            tasks = [
                self.check_direct_messages(),
                self._handle_requests(),
            ]
        else:
            tasks = [
                self.check_direct_messages(),
                self._get_pairs_loop(),
                self.update_relays()
            ]
        try:
            async with self.taskgroup as group:
                for task in tasks:
                    await group.spawn(task)
        except Exception as e:
            self.logger.exception("taskgroup died.")
        finally:
            self.logger.info("taskgroup stopped.")

    @log_exceptions
    async def stop(self):
        self.logger.info("shutting down nostr transport")
        self.sm.is_initialized.clear()
        self.is_connected.clear()
        await self.taskgroup.cancel_remaining()
        await self.relay_manager.close()
        self.logger.info("nostr transport shut down")

    @property
    def relays(self):
        our_relays = self.config.NOSTR_RELAYS.split(',') if self.config.NOSTR_RELAYS else []
        if self.sm.is_server:
            return our_relays
        last_swapserver_relays = self._last_swapserver_relays or []
        return list(set(our_relays + last_swapserver_relays))

    def get_relay_manager(self) -> aionostr.Manager:
        assert get_running_loop() == get_asyncio_loop(), f"this must be run on the asyncio thread!"
        if not self.relay_manager:
            if self.uses_proxy:
                proxy = make_aiohttp_proxy_connector(self.network.proxy, self.ssl_context)
            else:
                proxy: Optional['ProxyConnector'] = None
            nostr_logger = self.logger.getChild('aionostr')
            nostr_logger.setLevel('INFO')  # DEBUG is very verbose with aionostr
            return aionostr.Manager(
                self.relays,
                private_key=self.nostr_private_key,
                log=nostr_logger,
                ssl_context=self.ssl_context,
                proxy=proxy,
                connect_timeout=self.connect_timeout
            )
        return self.relay_manager

    def get_offer(self, pubkey: str) -> Optional[SwapOffer]:
        return self._offers.get(pubkey)

    def get_recent_offers(self) -> Sequence[SwapOffer]:
        # filter to fresh timestamps
        now = int(time.time())
        recent_offers = [x for x in self._offers.values() if now - x.timestamp < 3600]
        # sort by proof-of-work
        recent_offers = sorted(recent_offers, key=lambda x: x.pow_bits, reverse=True)
        # cap list size
        recent_offers = recent_offers[:20]
        return recent_offers

    @ignore_exceptions
    @log_exceptions
    async def publish_offer(self, sm: 'SwapManager') -> None:
        assert self.sm.is_server
        if sm._max_forward < sm._min_amount and sm._max_reverse < sm._min_amount:
            self.logger.warning(f"not publishing swap offer, no liquidity available: {sm._max_forward=}, {sm._max_reverse=}")
            return
        offer = {
            'percentage_fee': sm.percentage,
            'mining_fee': sm.mining_fee,
            'min_amount': sm._min_amount,
            'max_forward_amount': sm._max_forward,
            'max_reverse_amount': sm._max_reverse,
            'relays': sm.config.NOSTR_RELAYS,
            'pow_nonce': hex(sm.config.SWAPSERVER_ANN_POW_NONCE),
        }
        # the first value of a single letter tag is indexed and can be filtered for
        tags = [['d', f'electrum-swapserver-{self.NOSTR_EVENT_VERSION}'],
                ['r', 'net:' + constants.net.NET_NAME],
                ['expiration', str(int(time.time() + self.OFFER_UPDATE_INTERVAL_SEC + 10))]]
        try:
            event_id = await aionostr._add_event(
                self.relay_manager,
                kind=self.USER_STATUS_NIP38,
                tags=tags,
                content=json.dumps(offer),
                private_key=self.nostr_private_key)
            self.logger.info(f"published offer {event_id}")
        except asyncio.TimeoutError as e:
            self.logger.warning(f"failed to publish swap offer: {str(e)}")

    @ignore_exceptions
    @log_exceptions
    async def send_direct_message(self, pubkey: str, content: str, *, retries: int = 0) -> Optional[str]:
        assert retries < 25, "Use a sane retry amount"
        our_private_key = aionostr.key.PrivateKey(self.private_key)
        recv_pubkey_hex = aionostr.util.from_nip19(pubkey)['object'].hex() if pubkey.startswith('npub') else pubkey
        encrypted_msg = our_private_key.encrypt_message(content, recv_pubkey_hex)
        try:
            event_id = await aionostr._add_event(
                self.relay_manager,
                kind=self.EPHEMERAL_REQUEST,
                content=encrypted_msg,
                private_key=self.nostr_private_key,
                tags=[['p', recv_pubkey_hex]],
            )
        except asyncio.TimeoutError:
            self.logger.warning(f"sending message to {pubkey} failed: timeout. {retries=}")
            if retries > 0:
                return await self.send_direct_message(pubkey, content, retries=retries-1)
            return None
        return event_id

    @log_exceptions
    async def send_request_to_server(self, method: str, request_data: dict) -> dict:
        self.logger.debug(f"swapserver req: method: {method} relays: {self.relays}")
        request_data['method'] = method
        server_npub = self.config.SWAPSERVER_NPUB
        server_pubkey = aionostr.util.from_nip19(server_npub)['object'].hex()
        event_id = await self.send_direct_message(server_pubkey, json.dumps(request_data), retries=1)
        if not event_id:
            raise SwapServerError()
        self.dm_replies[(server_pubkey, event_id)] = fut = asyncio.Future()
        response = await fut
        assert isinstance(response, dict)
        if 'error' in response:
            self.logger.warning(f"error from swap server [DO NOT TRUST THIS MESSAGE]: {response['error']}")
            raise SwapServerError()
        return response

    async def _get_pairs_loop(self):
        await self.is_connected.wait()
        query = {
            "kinds": [self.USER_STATUS_NIP38],
            "limit": 10,
            "#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
            "#r": [f"net:{constants.net.NET_NAME}"],
            "since": int(time.time()) - 60 * 60,
            "until": int(time.time()) + 60 * 60,
        }
        async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
            try:
                content = json.loads(event.content)
                if not isinstance(content, dict):
                    raise Exception("malformed content, not dict")
                tags = {k: v for k, v in event.tags}
            except Exception as e:
                self.logger.debug(f"failed to parse event: {e}")
                continue
            if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
                continue
            if tags.get('r') != f"net:{constants.net.NET_NAME}":
                continue
            if (event.created_at > int(time.time()) + 60 * 60
                    or event.created_at < int(time.time()) - 60 * 60):
                continue
            # check if this is the most recent event for this pubkey
            pubkey = event.pubkey
            prev_offer = self._offers.get(to_nip19('npub', pubkey))
            if prev_offer and event.created_at <= prev_offer.timestamp:
                continue
            try:
                pow_nonce = int(content.get('pow_nonce', "0"), 16)  # type: int
            except Exception:
                continue
            pow_bits = get_nostr_ann_pow_amount(bytes.fromhex(pubkey), pow_nonce)
            if pow_bits < self.config.SWAPSERVER_POW_TARGET:
                self.logger.debug(f"too low pow: {pubkey}: pow: {pow_bits} nonce: {pow_nonce}")
                continue
            try:
                server_relays = content['relays'].split(',')
            except Exception:
                continue
            try:
                pairs = SwapFees(
                    percentage=content['percentage_fee'],
                    mining_fee=content['mining_fee'],
                    min_amount=content['min_amount'],
                    max_forward=content['max_forward_amount'],
                    max_reverse=content['max_reverse_amount'],
                )
            except Exception:
                self.logger.debug(f"swap fees couldn't be parsed", exc_info=True)
                continue
            offer = SwapOffer(
                pairs=pairs,
                relays=server_relays[:10],
                timestamp=event.created_at,
                server_pubkey=pubkey,
                pow_bits=pow_bits,
            )
            self._offers[offer.server_npub] = offer
            if self.config.SWAPSERVER_NPUB == offer.server_npub:
                self.sm.update_pairs(pairs)
            trigger_callback('swap_offers_changed', self.get_recent_offers())
            # mirror event to other relays
            await self.taskgroup.spawn(self.rebroadcast_event(event, server_relays))

    async def update_relays(self):
        """
        Update the relays when update_pairs is called.
        This ensures we try to connect to the same relays as the ones announced by the swap server.
        """
        while True:
            previous_relays = self._last_swapserver_relays
            await self.sm.pairs_updated.wait()
            if (conf_swapserver_offer := self._offers.get(self.config.SWAPSERVER_NPUB)) is None:
                self.logger.debug(
                    f"pairs updated but no pair for {self.config.SWAPSERVER_NPUB=} available? {self._offers=}",
                    stack_info=True,
                )
                continue
            latest_known_relays = conf_swapserver_offer.relays
            if latest_known_relays != previous_relays:
                self.logger.debug(f"swapserver relays changed, updating relay list.")
                # store the latest known relays to a file
                self._store_last_swapserver_relays(latest_known_relays)
                # update the relay manager
                await self.relay_manager.update_relays(self.relays)

    async def rebroadcast_event(self, event: Event, server_relays: Sequence[str]):
        """If the relays of the origin server are different from our relays we rebroadcast the
        event to our relays so it gets spread more widely."""
        if not server_relays:
            return
        rebroadcast_relays = [relay for relay in self.relay_manager.relays if
                              relay.url not in server_relays]
        for relay in rebroadcast_relays:
            try:
                res = await relay.add_event(event, check_response=True)
            except Exception as e:
                self.logger.debug(f"failed to rebroadcast event to {relay.url}: {e}")
                continue
            self.logger.debug(f"rebroadcasted event to {relay.url}: {res}")

    @log_exceptions
    async def check_direct_messages(self):
        privkey = aionostr.key.PrivateKey(self.private_key)
        query = {"kinds": [self.EPHEMERAL_REQUEST], "limit":0, "#p": [self.nostr_pubkey]}
        async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
            try:
                content = privkey.decrypt_message(event.content, event.pubkey)
                content = json.loads(content)
                if not isinstance(content, dict):
                    raise Exception("malformed content, not dict")
            except Exception:
                continue
            content['event_id'] = event.id
            content['event_pubkey'] = event.pubkey
            if not self.sm.is_server and 'reply_to' in content:
                prev_event_id = content['reply_to']
                server_pubkey = event.pubkey
                fut = self.dm_replies.get((server_pubkey, prev_event_id))
                if fut:
                    fut.set_result(content)
            elif self.sm.is_server and 'method' in content:
                if self._swap_server_requests.full():
                    self.logger.warning(f"too many swap requests, dropping incoming request: {event.id[:10]}...")
                    continue
                await self._swap_server_requests.put(content)
            else:
                self.logger.info(f'unknown message {content}')

    @log_exceptions
    async def _handle_requests(self) -> None:
        assert self.sm.is_server
        while True:
            await asyncio.sleep(5)
            request = await self._swap_server_requests.get()
            event_id = request.pop('event_id')
            event_pubkey = request.pop('event_pubkey')
            try:
                method = request.pop('method')
                self.logger.info(f'handle_request: id={event_id} {method} {request}')
                if method == 'addswapinvoice':  # client-forward-swap phase2
                    r = self.sm.server_add_swap_invoice(request)
                elif method == 'createswap':  # client-reverse-swap
                    r = self.sm.server_create_swap(request)
                elif method == 'createnormalswap':  # client-forward-swap phase1
                    r = self.sm.server_create_normal_swap(request)
                else:
                    raise Exception(method)
                r['reply_to'] = event_id
                self.logger.debug(f'sending response id={event_id}')
                await self.taskgroup.spawn(self.send_direct_message(event_pubkey, json.dumps(r), retries=2))
            except Exception as e:
                self.logger.exception(f"failed to handle {request=}")
                error_response = json.dumps({
                    "error": f"Internal Server Error: {str(type(e))}",
                    "reply_to": event_id,
                })
                await self.taskgroup.spawn(self.send_direct_message(event_pubkey, error_response))

    def _store_last_swapserver_relays(self, relays: Sequence[str]):
        self._last_swapserver_relays = relays
        if not self.config.path or not relays:
            return
        storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
        try:
            with open(storage_path, 'w', encoding="utf-8") as f:
                json.dump(relays, f, indent=4, sort_keys=True)  # type: ignore
        except Exception:
            self.logger.exception(f"failed to write last swapserver relays to {storage_path}")

    def _load_last_swapserver_relays(self) -> Optional[Sequence[str]]:
        storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
        if not os.path.exists(storage_path):
            return None
        try:
            with open(storage_path, 'r', encoding="utf-8") as f:
                relays = json.load(f)
        except Exception:
            self.logger.exception(f"failed to read last swapserver relays from {storage_path}")
            return None
        return relays
